27.2 复杂系统设计与实现

19 分钟阅读

复杂系统设计概述#

Claude Code 可以帮助开发者设计和实现复杂的软件系统。通过系统化的设计方法和分阶段的实现策略,开发者可以利用 Claude Code 构建可扩展、可维护的复杂系统。

系统架构设计#

1. 微服务架构#

示例:设计微服务架构

用户请求: "设计一个电商系统的微服务架构"

Claude Code 生成的架构设计:

系统架构#

bash
│                        API Gateway                          │
│  (负载均衡、认证、路由、限流、监控)                           │
└─────────────────────────────────────────────────────────────┘
│
┌─────────────────────┼─────────────────────┐
│                     │                     │
┌───────▼────────┐  ┌────────▼────────┐  ┌────────▼────────┐
│  User Service  │  │ Product Service │  │ Order Service   │
│  - 用户管理     │  │  - 商品管理      │  │  - 订单管理      │
│  - 认证授权     │  │  - 库存管理      │  │  - 支付处理      │
│  - 个人资料     │  │  - 分类管理      │  │  - 物流跟踪      │
└────────────────┘  └─────────────────┘  └─────────────────┘
│                     │                     │
└─────────────────────┼─────────────────────┘
│
┌─────────────────────┼─────────────────────┐
│                     │                     │
┌───────▼────────┐  ┌────────▼────────┐  ┌────────▼────────┐
│ Payment Service│  │ Inventory      │  │ Notification    │
│  - 支付处理     │  │ Service        │  │ Service         │
│  - 退款处理     │  │  - 库存同步     │  │  - 邮件通知      │
│  - 交易记录     │  │  - 预警管理     │  │  - 短信通知      │
└────────────────┘  └─────────────────┘  └─────────────────┘
│
┌─────────▼─────────┐
│   Message Broker  │
│  (Kafka/RabbitMQ) │
└───────────────────┘
```
## 服务定义

### User Service (用户服务)
```python
python

# services/user_service/app.py
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from flask_jwt_extended import JWTManager, create_access_token
import bcrypt

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://localhost/users_db'
app.config['JWT_SECRET_KEY'] = 'your-secret-key'

db = SQLAlchemy(app)
jwt = JWTManager(app)

class User(db.Model):
    """用户模型"""
    __tablename__ = 'users'

    id = db.Column(db.Integer, primary_key=True)
    email = db.Column(db.String(120), unique=True, nullable=False)
    password_hash = db.Column(db.String(255), nullable=False)
    name = db.Column(db.String(100), nullable=False)
    phone = db.Column(db.String(20))
    created_at = db.Column(db.DateTime, server_default=db.func.now())

    def set_password(self, password):
        """设置密码"""
        self.password_hash = bcrypt.hashpw(
            password.encode('utf-8'),
            bcrypt.gensalt()
        ).decode('utf-8')

    def check_password(self, password):
        """验证密码"""
        return bcrypt.checkpw(
            password.encode('utf-8'),
            self.password_hash.encode('utf-8')
        )

@app.route('/register', methods=['POST'])
def register():
    """用户注册"""
    data = request.get_json()

    user = User(
        email=data['email'],
        name=data['name'],
        phone=data.get('phone')
    )
    user.set_password(data['password'])

    db.session.add(user)
    db.session.commit()

    return jsonify({'user_id': user.id}), 201

@app.route('/login', methods=['POST'])
def login():
    """用户登录"""
    data = request.get_json()

    user = User.query.filter_by(email=data['email']).first()

    if not user or not user.check_password(data['password']):
        return jsonify({'error': 'Invalid credentials'}), 401

    access_token = create_access_token(identity=user.id)

    return jsonify({
        'access_token': access_token,
        'user': {
            'id': user.id,
            'email': user.email,
            'name': user.name
        }
    })

@app.route('/users/<int:user_id>', methods=['GET'])
def get_user(user_id):
    """获取用户信息"""
    user = User.query.get_or_404(user_id)
    return jsonify({
        'id': user.id,
        'email': user.email,
        'name': user.name,
        'phone': user.phone
    })

### Product Service (商品服务)

```python

# services/product_service/app.py
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from datetime import datetime

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://localhost/products_db'

db = SQLAlchemy(app)
migrate = Migrate(app, db)

class Product(db.Model):
"""商品模型"""
__tablename__ = 'products'

id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(200), nullable=False)
description = db.Column(db.Text)
price = db.Column(db.Float, nullable=False)
stock = db.Column(db.Integer, default=0)
category_id = db.Column(db.Integer, db.ForeignKey('categories.id'))
created_at = db.Column(db.DateTime, server_default=db.func.now())
updated_at = db.Column(db.DateTime, server_default=db.func.now(), onupdate=db.func.now())

category = db.relationship('Category', backref='products')

class Category(db.Model):
"""分类模型"""
__tablename__ = 'categories'

id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
parent_id = db.Column(db.Integer, db.ForeignKey('categories.id'))

parent = db.relationship('Category', remote_side=[id], backref='children')

@app.route('/products', methods=['POST'])
def create_product():
"""创建商品"""
data = request.get_json()

product = Product(
name=data['name'],
description=data.get('description'),
price=data['price'],
stock=data.get('stock', 0),
category_id=data.get('category_id')
)

db.session.add(product)
db.session.commit()

return jsonify({'product_id': product.id}), 201

@app.route('/products/<int:product_id>', methods=['GET'])
def get_product(product_id):
"""获取商品详情"""
product = Product.query.get_or_404(product_id)
return jsonify({
'id': product.id,
'name': product.name,
'description': product.description,
'price': product.price,
'stock': product.stock,
'category': product.category.name if product.category else None
})

@app.route('/products/<int:product_id>/stock', methods=['PUT'])
def update_stock(product_id):
"""更新库存"""
data = request.get_json()

product = Product.query.get_or_404(product_id)
product.stock = data['stock']

db.session.commit()

return jsonify({'stock': product.stock})

### Order Service (订单服务)
```python
python

# services/order_service/app.py
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime
import requests

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://localhost/orders_db'

db = SQLAlchemy(app)

class Order(db.Model):
    """订单模型"""
    __tablename__ = 'orders'

    id = db.Column(db.Integer, primary_key=True)
    user_id = db.Column(db.Integer, nullable=False)
    total_amount = db.Column(db.Float, nullable=False)
    status = db.Column(db.String(20), default='pending')
    created_at = db.Column(db.DateTime, server_default=db.func.now())
    updated_at = db.Column(db.DateTime, server_default=db.func.now(), onupdate=db.func.now())

class OrderItem(db.Model):
    """订单项模型"""
    __tablename__ = 'order_items'

    id = db.Column(db.Integer, primary_key=True)
    order_id = db.Column(db.Integer, db.ForeignKey('orders.id'), nullable=False)
    product_id = db.Column(db.Integer, nullable=False)
    quantity = db.Column(db.Integer, nullable=False)
    price = db.Column(db.Float, nullable=False)

    order = db.relationship('Order', backref='items')

@app.route('/orders', methods=['POST'])
def create_order():
    """创建订单"""
    data = request.get_json()

    # 验证商品库存
    for item in data['items']:
        response = requests.get(
            f"http://product-service:5001/products/{item['product_id']}"
        )
        product = response.json()

        if product['stock'] < item['quantity']:
            return jsonify({
                'error': f'Insufficient stock for product {item["product_id"]}'
            }), 400

    # 创建订单
    order = Order(
        user_id=data['user_id'],
        total_amount=0
    )
    db.session.add(order)
    db.session.flush()

    # 创建订单项
    total_amount = 0
    for item in data['items']:
        order_item = OrderItem(
            order_id=order.id,
            product_id=item['product_id'],
            quantity=item['quantity'],
            price=item['price']
        )
        db.session.add(order_item)
        total_amount += item['price'] * item['quantity']

    order.total_amount = total_amount
    db.session.commit()

    # 扣减库存
    for item in data['items']:
        response = requests.get(
            f"http://product-service:5001/products/{item['product_id']}"
        )
        product = response.json()
        new_stock = product['stock'] - item['quantity']

        requests.put(
            f"http://product-service:5001/products/{item['product_id']}/stock",
            json={'stock': new_stock}
        )

    return jsonify({'order_id': order.id}), 201

@app.route('/orders/<int:order_id>', methods=['GET'])
def get_order(order_id):
    """获取订单详情"""
    order = Order.query.get_or_404(order_id)

    return jsonify({
        'id': order.id,
        'user_id': order.user_id,
        'total_amount': order.total_amount,
        'status': order.status,
        'items': [
            {
                'product_id': item.product_id,
                'quantity': item.quantity,
                'price': item.price
            }
            for item in order.items
        ]
    })

## API Gateway 实现
```python
python

# api_gateway/app.py
from flask import Flask, request, jsonify
import requests
import jwt
from functools import wraps

app = Flask(__name__)

SERVICES = {
    'user': 'http://user-service:5000',
    'product': 'http://product-service:5001',
    'order': 'http://order-service:5002'
}

def authenticate(f):
    """认证装饰器"""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        token = request.headers.get('Authorization')

        if not token:
            return jsonify({'error': 'Missing token'}), 401

        try:
            payload = jwt.decode(token.split()[1], 'your-secret-key', algorithms=['HS256'])
            request.user_id = payload['user_id']
        except jwt.InvalidTokenError:
            return jsonify({'error': 'Invalid token'}), 401

        return f(*args, **kwargs)

    return decorated_function

@app.route('/<service>/<path:path>', methods=['GET', 'POST', 'PUT', 'DELETE'])
def proxy(service, path):
    """代理请求到对应的服务"""
    if service not in SERVICES:
        return jsonify({'error': 'Service not found'}), 404

    service_url = SERVICES[service]
    url = f"{service_url}/{path}"

    headers = {key: value for key, value in request.headers if key != 'Host'}

    response = requests.request(
        method=request.method,
        url=url,
        headers=headers,
        json=request.get_json(),
        params=request.args
    )

    return jsonify(response.json()), response.status_code

@app.route('/health')
def health():
    """健康检查"""
    return jsonify({'status': 'healthy'})

```> >
### 2. 事件驱动架构
````python
`python

# 示例:设计事件驱动架构

用户请求:
"设计一个基于事件驱动的订单处理系统"

Claude Code 生成的架构设计:

## 系统架构

```> ┌──────────────┐
│   订单服务    │
│  (Order      │
│   Service)   │
└──────┬───────┘
│ 发布事件
▼
┌─────────────────────────────────────────────────────────────┐
│                    Message Broker                           │
│                    (Kafka/RabbitMQ)                         │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐  │
│  │ 订单创建  │  │ 订单支付  │  │ 订单发货  │  │ 订单完成  │  │
│  │ 事件     │  │ 事件     │  │ 事件     │  │ 事件     │  │
│  └──────────┘  └──────────┘  └──────────┘  └──────────┘  │
└─────────────────────────────────────────────────────────────┘
│              │              │              │
▼              ▼              ▼              ▼
┌──────────┐   ┌──────────┐   ┌──────────┐   ┌──────────┐
│ 库存服务  │   │ 支付服务  │   │ 物流服务  │   │ 通知服务  │
│ Inventory│   │ Payment  │   │ Shipping │   │ Notify   │
│ Service  │   │ Service  │   │ Service  │   │ Service  │
└──────────┘   └──────────┘   └──────────┘   └──────────┘

```
## 事件定义

# events/order_events.py
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, Any
import json
@dataclass
class OrderEvent:
"""订单事件基类"""
event_type: str
order_id: int
timestamp: datetime
data: Dict[str, Any]
def to_dict(self) -> dict:
"""转换为字典"""
return {
'event_type': self.event_type,
'order_id': self.order_id,
'timestamp': self.timestamp.isoformat(),
'data': self.data
}
def to_json(self) -> str:
"""转换为 JSON"""
return json.dumps(self.to_dict())
@dataclass
class OrderCreatedEvent(OrderEvent):
"""订单创建事件"""
def __init__(self, order_id: int, user_id: int, total_amount: float, items: list):

super().init( event_type='order.created', order_id=order_id, timestamp=datetime.utcnow(), data={ 'user_id': user_id, 'total_amount': total_amount, 'items': items } ) @dataclass class OrderPaidEvent(OrderEvent): """订单支付事件""" def init(self, order_id: int, payment_method: str, transaction_id: str): super().init( event_type='order.paid', order_id=order_id, timestamp=datetime.utcnow(), data={ 'payment_method': payment_method, 'transaction_id': transaction_id } ) @dataclass class OrderShippedEvent(OrderEvent): """订单发货事件""" def init(self, order_id: int, tracking_number: str, shipping_address: dict): super().init( event_type='order.shipped', order_id=order_id, timestamp=datetime.utcnow(), data={ 'tracking_number': tracking_number, 'shipping_address': shipping_address } ) @dataclass class OrderCompletedEvent(OrderEvent): """订单完成事件""" def init(self, order_id: int, delivery_time: datetime): super().init( event_type='order.completed', order_id=order_id, timestamp=datetime.utcnow(), data={ 'delivery_time': delivery_time.isoformat() } )

bash
## 事件发布者

python

services/order_service/event_publisher.py

import pika import json from events.order_events import OrderEvent

class EventPublisher: """事件发布者"""

bash
def __init__(self, rabbitmq_url: str = 'amqp://localhost'):
    self.connection = pika.BlockingConnection(pika.URLParameters(rabbitmq_url))
    self.channel = self.connection.channel()

    # 声明交换机
    self.channel.exchange_declare(
        exchange='order_events',
        exchange_type='topic'
    )

def publish(self, event: OrderEvent, routing_key: str):
    """发布事件"""
    self.channel.basic_publish(
        exchange='order_events',
        routing_key=routing_key,
        body=event.to_json(),
        properties=pika.BasicProperties(
            delivery_mode=2,  # 持久化
        )
    )

def close(self):
    """关闭连接"""
    self.connection.close()

事件消费者#

services/inventory_service/event_consumer.py

import pika import json import requests class InventoryEventConsumer: """库存事件消费者""" def init(self, rabbitmq_url: str = 'amqp://localhost'): self.connection = pika.BlockingConnection(pika.URLParameters(rabbitmq_url)) self.channel = self.connection.channel()

声明交换机和队列

self.channel.exchange_declare( exchange='order_events', exchange_type='topic' ) self.channel.queue_declare(queue='inventory_queue', durable=True) self.channel.queue_bind( exchange='order_events', queue='inventory_queue', routing_key='order.created' ) def handle_order_created(self, ch, method, properties, body): """处理订单创建事件""" event = json.loads(body) print(f"Received order.created event: {event}")

扣减库存

for item in event['data']['items']: response = requests.get( f"http://product-service:5001/products/{item['product_id']}" ) product = response.json() new_stock = product['stock'] - item['quantity'] requests.put( f"http://product-service:5001/products/{item['product_id']}/stock", json={'stock': new_stock} )

确认消息

ch.basic_ack(delivery_tag=method.delivery_tag) def start_consuming(self): """开始消费消息""" self.channel.basic_qos(prefetch_count=1) self.channel.basic_consume( queue='inventory_queue', on_message_callback=self.handle_order_created ) print('Inventory service started consuming messages...') self.channel.start_consuming() def close(self): """关闭连接""" self.connection.close()

bash
## 事件驱动订单服务

python

services/order_service/app.py

from flask import Flask, request, jsonify from flask_sqlalchemy import SQLAlchemy from datetime import datetime from event_publisher import EventPublisher from events.order_events import OrderCreatedEvent

app = Flask(name) app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://localhost/orders_db'

db = SQLAlchemy(app) event_publisher = EventPublisher()

class Order(db.Model): """订单模型""" tablename = 'orders'

bash
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, nullable=False)
total_amount = db.Column(db.Float, nullable=False)
status = db.Column(db.String(20), default='pending')
created_at = db.Column(db.DateTime, server_default=db.func.now())
updated_at = db.Column(db.DateTime, server_default=db.func.now(), onupdate=db.func.now())

class OrderItem(db.Model): """订单项模型""" tablename = 'order_items'

bash
id = db.Column(db.Integer, primary_key=True)
order_id = db.Column(db.Integer, db.ForeignKey('orders.id'), nullable=False)
product_id = db.Column(db.Integer, nullable=False)
quantity = db.Column(db.Integer, nullable=False)
price = db.Column(db.Float, nullable=False)

order = db.relationship('Order', backref='items')

@app.route('/orders', methods=['POST']) def create_order(): """创建订单""" data = request.get_json()

bash
# 创建订单
order = Order(
    user_id=data['user_id'],
    total_amount=0
)
db.session.add(order)
db.session.flush()

# 创建订单项
total_amount = 0
items = []
for item in data['items']:
    order_item = OrderItem(
        order_id=order.id,
        product_id=item['product_id'],
        quantity=item['quantity'],
        price=item['price']
    )
    db.session.add(order_item)
    total_amount += item['price'] * item['quantity']
    items.append({
        'product_id': item['product_id'],
        'quantity': item['quantity'],
        'price': item['price']
    })

order.total_amount = total_amount
db.session.commit()

# 发布订单创建事件
event = OrderCreatedEvent(
    order_id=order.id,
    user_id=data['user_id'],
    total_amount=total_amount,
    items=items
)
event_publisher.publish(event, 'order.created')

return jsonify({'order_id': order.id}), 201

if name == 'main': app.run(host='0.0.0.0', port=5002)

标记本节教程为已读

记录您的学习进度,方便后续查看。